package com.bw.gmall.realtime.dim.function;

import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.bean.TableProcessDim;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.util.HbaseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.client.Connection;

/**
 * @Author sw
 * @CreateTime 2024-11-14
 */
public class DimSinkFunction extends RichSinkFunction<Tuple2<JSONObject, TableProcessDim>> {

    private Connection hbaseConnect;

    @Override
    public void open(Configuration parameters) throws Exception {
        hbaseConnect = HbaseUtil.getHbaseConnect();
    }

    @Override
    public void invoke(Tuple2<JSONObject, TableProcessDim> value, Context context) throws Exception {
        //主流维度数据
        JSONObject f0 = value.f0;
        System.out.println(f0);
        //对应的配置表数据
        TableProcessDim f1 = value.f1;
        System.out.println("正在写入维度数据" + f1.getSourceTable());
        //往hbase写的数据
        JSONObject data = f0.getJSONObject("data");
        //maxwell
        String type = f0.getString("type");
        //要往hbase写哪张表
        String sinkTable = f1.getSinkTable();
        String rowkey = f1.getSinkRowKey();
        String sinkFamily = f1.getSinkFamily();

        //拿到rowkey具体值
        String rowkeyvalue = data.getString(rowkey);
//        System.out.println(sinkTable);
        System.out.println(rowkey);
        System.out.println(rowkeyvalue);
//        System.out.println(sinkFamily);
//        System.out.println(data);
        if ("delete".equals(type)){
            HbaseUtil.deleteCells(hbaseConnect, Constant.HBASE_NAMESPACE,sinkTable,rowkeyvalue);
        }else {
            HbaseUtil.putCells(hbaseConnect,Constant.HBASE_NAMESPACE,sinkTable,rowkeyvalue,sinkFamily,data);
        }
    }

    @Override
    public void close() throws Exception {
        HbaseUtil.closeHBaseConn(hbaseConnect);
    }
}
